diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 2c28a09f613..d788110c196 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -70,14 +70,16 @@ func new(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *monitor return sw } -// Stop stops the monitoring process +// Stop deregisters the sink and stops the monitoring process func (d *monitor) Stop() { + d.logger.DeregisterSink(d.sink) close(d.doneCh) } // Start registers a sink on the monitor's logger and starts sending // received log messages over the returned channel. func (d *monitor) Start() <-chan []byte { + // register our sink with the logger d.logger.RegisterSink(d.sink) streamCh := make(chan []byte, d.bufSize) @@ -85,10 +87,7 @@ func (d *monitor) Start() <-chan []byte { // run a go routine that listens for streamed // log messages and sends them to streamCh go func() { - defer func() { - d.logger.DeregisterSink(d.sink) - close(streamCh) - }() + defer close(streamCh) for { select { @@ -109,11 +108,10 @@ func (d *monitor) Start() <-chan []byte { // to add a dropped message count warning go func() { // loop and check for dropped messages - LOOP: for { select { case <-d.doneCh: - break LOOP + return case <-time.After(d.droppedDuration): d.Lock() @@ -122,7 +120,8 @@ func (d *monitor) Start() <-chan []byte { dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) select { case <-d.doneCh: - break LOOP + d.Unlock() + return // Try sending dropped message count to logCh in case // there is room in the buffer now. case d.logCh <- []byte(dropped): @@ -168,5 +167,6 @@ func (d *monitor) Write(p []byte) (n int, err error) { default: d.droppedCount++ } - return + + return len(p), nil }