diff --git a/cmd/logcounter/log_counter.go b/cmd/logcounter/log_counter.go index 672d8be25..d3c03de25 100644 --- a/cmd/logcounter/log_counter.go +++ b/cmd/logcounter/log_counter.go @@ -44,7 +44,11 @@ func main() { fmt.Print(err) os.Exit(int(types.Unknown)) } - actual := counter.Count() + actual, err := counter.Count() + if err != nil { + fmt.Print(err) + os.Exit(int(types.Unknown)) + } if actual >= fedo.Count { fmt.Printf("Found %d matching logs, which meets the threshold of %d\n", actual, fedo.Count) os.Exit(int(types.NonOK)) diff --git a/pkg/custompluginmonitor/custom_plugin_monitor.go b/pkg/custompluginmonitor/custom_plugin_monitor.go index 66a19079e..8b9bf6d42 100644 --- a/pkg/custompluginmonitor/custom_plugin_monitor.go +++ b/pkg/custompluginmonitor/custom_plugin_monitor.go @@ -128,7 +128,11 @@ func (c *customPluginMonitor) monitorLoop() { for { select { - case result := <-resultChan: + case result, ok := <-resultChan: + if !ok { + glog.Errorf("Result channel closed: %s", c.configPath) + return + } glog.V(3).Infof("Receive new plugin result for %s: %+v", c.configPath, result) status := c.generateStatus(result) glog.Infof("New status generated: %+v", status) diff --git a/pkg/custompluginmonitor/plugin/plugin.go b/pkg/custompluginmonitor/plugin/plugin.go index 19fbd3ec8..ea6c5a2af 100644 --- a/pkg/custompluginmonitor/plugin/plugin.go +++ b/pkg/custompluginmonitor/plugin/plugin.go @@ -55,6 +55,7 @@ func (p *Plugin) GetResultChan() <-chan cpmtypes.Result { func (p *Plugin) Run() { defer func() { glog.Info("Stopping plugin execution") + close(p.resultChan) p.tomb.Done() }() diff --git a/pkg/logcounter/log_counter.go b/pkg/logcounter/log_counter.go index 94a84d547..7fd22e57a 100644 --- a/pkg/logcounter/log_counter.go +++ b/pkg/logcounter/log_counter.go @@ -63,11 +63,15 @@ func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter }, nil } -func (e *logCounter) Count() (count int) { +func (e *logCounter) Count() (count int, err error) { start := e.clock.Now() for { select { - case log := <-e.logCh: + case log, ok := <-e.logCh: + if !ok { + err = fmt.Errorf("log channel closed unexpectedly") + return + } // We only want to count events up until the time at which we started. // Otherwise we would run forever if start.Before(log.Timestamp) { diff --git a/pkg/logcounter/log_counter_test.go b/pkg/logcounter/log_counter_test.go index a3baaf468..23ec27b0f 100644 --- a/pkg/logcounter/log_counter_test.go +++ b/pkg/logcounter/log_counter_test.go @@ -120,7 +120,10 @@ func TestCount(t *testing.T) { fakeClock.Step(2 * timeout) } }(tc.logs, logCh) - actualCount := counter.Count() + actualCount, err := counter.Count() + if err != nil { + t.Errorf("unexpected error %v", err) + } if actualCount != tc.expectedCount { t.Errorf("got %d; expected %d", actualCount, tc.expectedCount) } diff --git a/pkg/logcounter/types/types.go b/pkg/logcounter/types/types.go index 640d3a78d..455f5cae9 100644 --- a/pkg/logcounter/types/types.go +++ b/pkg/logcounter/types/types.go @@ -17,5 +17,5 @@ limitations under the License. package types type LogCounter interface { - Count() int + Count() (int, error) } diff --git a/pkg/systemlogmonitor/log_monitor.go b/pkg/systemlogmonitor/log_monitor.go index 0ce2730c6..ffe25f25f 100644 --- a/pkg/systemlogmonitor/log_monitor.go +++ b/pkg/systemlogmonitor/log_monitor.go @@ -125,13 +125,16 @@ func (l *logMonitor) Stop() { // monitorLoop is the main loop of log monitor. func (l *logMonitor) monitorLoop() { - defer l.tomb.Done() + defer func() { + close(l.output) + l.tomb.Done() + }() l.initializeStatus() for { select { case log, ok := <-l.logCh: if !ok { - glog.Errorf("Log channel closed") + glog.Errorf("Log channel closed: %s", l.configPath) return } l.parseLog(log) diff --git a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go index d5b66a0ef..5f8ed60f7 100644 --- a/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go +++ b/pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go @@ -86,21 +86,25 @@ func (k *kernelLogWatcher) Stop() { // watchLoop is the main watch loop of kernel log watcher. func (k *kernelLogWatcher) watchLoop() { + kmsgs := k.kmsgParser.Parse() defer func() { + if err := k.kmsgParser.Close(); err != nil { + glog.Errorf("Failed to close kmsg parser: %v", err) + } close(k.logCh) k.tomb.Done() }() - kmsgs := k.kmsgParser.Parse() for { select { case <-k.tomb.Stopping(): glog.Infof("Stop watching kernel log") - if err := k.kmsgParser.Close(); err != nil { - glog.Errorf("Failed to close kmsg parser: %v", err) - } return - case msg := <-kmsgs: + case msg, ok := <-kmsgs: + if !ok { + glog.Error("Kmsg channel closed") + return + } glog.V(5).Infof("got kernel message: %+v", msg) if msg.Message == "" { continue diff --git a/test/e2e/lib/gce/instance.go b/test/e2e/lib/gce/instance.go index 15deda688..69d135567 100644 --- a/test/e2e/lib/gce/instance.go +++ b/test/e2e/lib/gce/instance.go @@ -46,7 +46,7 @@ func CreateInstance(instance Instance, imageName string, imageProject string) (I p, err := instance.ComputeService.Projects.Get(instance.Project).Do() if err != nil { - return instance, fmt.Errorf("failed to get project info %q", instance.Project) + return instance, fmt.Errorf("failed to get project info %q: %v", instance.Project, err) } i := &compute.Instance{