Skip to content

Commit

Permalink
Merge pull request #375 from Random-Liu/fix-channel-close-issue
Browse files Browse the repository at this point in the history
Properly close channel when monitors exit.
  • Loading branch information
k8s-ci-robot authored Oct 25, 2019
2 parents 705cb01 + be7cc78 commit ad76b93
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 14 deletions.
6 changes: 5 additions & 1 deletion cmd/logcounter/log_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,11 @@ func main() {
fmt.Print(err)
os.Exit(int(types.Unknown))
}
actual := counter.Count()
actual, err := counter.Count()
if err != nil {
fmt.Print(err)
os.Exit(int(types.Unknown))
}
if actual >= fedo.Count {
fmt.Printf("Found %d matching logs, which meets the threshold of %d\n", actual, fedo.Count)
os.Exit(int(types.NonOK))
Expand Down
6 changes: 5 additions & 1 deletion pkg/custompluginmonitor/custom_plugin_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,11 @@ func (c *customPluginMonitor) monitorLoop() {

for {
select {
case result := <-resultChan:
case result, ok := <-resultChan:
if !ok {
glog.Errorf("Result channel closed: %s", c.configPath)
return
}
glog.V(3).Infof("Receive new plugin result for %s: %+v", c.configPath, result)
status := c.generateStatus(result)
glog.Infof("New status generated: %+v", status)
Expand Down
1 change: 1 addition & 0 deletions pkg/custompluginmonitor/plugin/plugin.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (p *Plugin) GetResultChan() <-chan cpmtypes.Result {
func (p *Plugin) Run() {
defer func() {
glog.Info("Stopping plugin execution")
close(p.resultChan)
p.tomb.Done()
}()

Expand Down
8 changes: 6 additions & 2 deletions pkg/logcounter/log_counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,15 @@ func NewJournaldLogCounter(options *options.LogCounterOptions) (types.LogCounter
}, nil
}

func (e *logCounter) Count() (count int) {
func (e *logCounter) Count() (count int, err error) {
start := e.clock.Now()
for {
select {
case log := <-e.logCh:
case log, ok := <-e.logCh:
if !ok {
err = fmt.Errorf("log channel closed unexpectedly")
return
}
// We only want to count events up until the time at which we started.
// Otherwise we would run forever
if start.Before(log.Timestamp) {
Expand Down
5 changes: 4 additions & 1 deletion pkg/logcounter/log_counter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,10 @@ func TestCount(t *testing.T) {
fakeClock.Step(2 * timeout)
}
}(tc.logs, logCh)
actualCount := counter.Count()
actualCount, err := counter.Count()
if err != nil {
t.Errorf("unexpected error %v", err)
}
if actualCount != tc.expectedCount {
t.Errorf("got %d; expected %d", actualCount, tc.expectedCount)
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/logcounter/types/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,5 +17,5 @@ limitations under the License.
package types

type LogCounter interface {
Count() int
Count() (int, error)
}
7 changes: 5 additions & 2 deletions pkg/systemlogmonitor/log_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,13 +125,16 @@ func (l *logMonitor) Stop() {

// monitorLoop is the main loop of log monitor.
func (l *logMonitor) monitorLoop() {
defer l.tomb.Done()
defer func() {
close(l.output)
l.tomb.Done()
}()
l.initializeStatus()
for {
select {
case log, ok := <-l.logCh:
if !ok {
glog.Errorf("Log channel closed")
glog.Errorf("Log channel closed: %s", l.configPath)
return
}
l.parseLog(log)
Expand Down
14 changes: 9 additions & 5 deletions pkg/systemlogmonitor/logwatchers/kmsg/log_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,21 +86,25 @@ func (k *kernelLogWatcher) Stop() {

// watchLoop is the main watch loop of kernel log watcher.
func (k *kernelLogWatcher) watchLoop() {
kmsgs := k.kmsgParser.Parse()
defer func() {
if err := k.kmsgParser.Close(); err != nil {
glog.Errorf("Failed to close kmsg parser: %v", err)
}
close(k.logCh)
k.tomb.Done()
}()
kmsgs := k.kmsgParser.Parse()

for {
select {
case <-k.tomb.Stopping():
glog.Infof("Stop watching kernel log")
if err := k.kmsgParser.Close(); err != nil {
glog.Errorf("Failed to close kmsg parser: %v", err)
}
return
case msg := <-kmsgs:
case msg, ok := <-kmsgs:
if !ok {
glog.Error("Kmsg channel closed")
return
}
glog.V(5).Infof("got kernel message: %+v", msg)
if msg.Message == "" {
continue
Expand Down
2 changes: 1 addition & 1 deletion test/e2e/lib/gce/instance.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func CreateInstance(instance Instance, imageName string, imageProject string) (I

p, err := instance.ComputeService.Projects.Get(instance.Project).Do()
if err != nil {
return instance, fmt.Errorf("failed to get project info %q", instance.Project)
return instance, fmt.Errorf("failed to get project info %q: %v", instance.Project, err)
}

i := &compute.Instance{
Expand Down

0 comments on commit ad76b93

Please sign in to comment.