diff --git a/command/agent/monitor/monitor.go b/command/agent/monitor/monitor.go index 1d063ae1dc9..dcc8cc54c15 100644 --- a/command/agent/monitor/monitor.go +++ b/command/agent/monitor/monitor.go @@ -1,27 +1,34 @@ package monitor import ( + "fmt" "sync" + "time" log "github.com/hashicorp/go-hclog" ) +// Monitor provides a mechanism to stream logs using go-hclog +// InterceptLogger and SinkAdapter. It allows streaming of logs +// at a different log level than what is set on the logger. type Monitor struct { sync.Mutex - sink log.SinkAdapter - logger log.InterceptLogger - logCh chan []byte - index int - droppedCount int - bufSize int + sink log.SinkAdapter + logger log.InterceptLogger + logCh chan []byte + droppedCount int + bufSize int + droppedDuration time.Duration } +// New creates a new Monitor. Start must be called in order to actually start +// streaming logs func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor { sw := &Monitor{ - logger: logger, - logCh: make(chan []byte, buf), - index: 0, - bufSize: buf, + logger: logger, + logCh: make(chan []byte, buf), + bufSize: buf, + droppedDuration: 3 * time.Second, } opts.Output = sw @@ -31,15 +38,25 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor return sw } +// Start registers a sink on the monitors logger and starts sending +// received log messages over the returned channel. A non-nil +// sopCh can be used to deregister the sink and stop log streaming func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { d.logger.RegisterSink(d.sink) logCh := make(chan []byte, d.bufSize) go func() { + defer close(logCh) for { select { case log := <-d.logCh: - logCh <- log + select { + case <-stopCh: + d.logger.DeregisterSink(d.sink) + close(d.logCh) + return + case logCh <- log: + } case <-stopCh: d.Lock() defer d.Unlock() @@ -51,6 +68,34 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte { } }() + go func() { + // loop and check for dropped messages + LOOP: + for { + select { + case <-stopCh: + break LOOP + case <-time.After(d.droppedDuration): + if d.droppedCount > 0 { + dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + select { + case d.logCh <- []byte(dropped): + default: + // Make room for dropped message + select { + case <-d.logCh: + d.droppedCount++ + dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount) + default: + } + d.logCh <- []byte(dropped) + } + d.droppedCount = 0 + } + } + } + }() + return logCh } @@ -67,10 +112,6 @@ func (d *Monitor) Write(p []byte) (n int, err error) { case d.logCh <- bytes: default: d.droppedCount++ - if d.droppedCount > 10 { - d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount) - d.droppedCount = 0 - } } return } diff --git a/command/agent/monitor/monitor_test.go b/command/agent/monitor/monitor_test.go index b513db44ebf..bb6cedaf224 100644 --- a/command/agent/monitor/monitor_test.go +++ b/command/agent/monitor/monitor_test.go @@ -1,11 +1,11 @@ package monitor import ( + "fmt" + "strings" "testing" "time" - "github.com/stretchr/testify/assert" - log "github.com/hashicorp/go-hclog" "github.com/stretchr/testify/require" ) @@ -38,6 +38,7 @@ func TestMonitor_Start(t *testing.T) { logger.Debug("test log") } +// Ensure number of dropped messages are logged func TestMonitor_DroppedMessages(t *testing.T) { t.Parallel() @@ -48,15 +49,39 @@ func TestMonitor_DroppedMessages(t *testing.T) { m := New(5, logger, &log.LoggerOptions{ Level: log.Debug, }) + m.droppedDuration = 5 * time.Millisecond doneCh := make(chan struct{}) defer close(doneCh) - m.Start(doneCh) + logCh := m.Start(doneCh) - for i := 0; i <= 9; i++ { - logger.Debug("test message") + for i := 0; i <= 100; i++ { + logger.Debug(fmt.Sprintf("test message %d", i)) } - assert.Greater(t, m.droppedCount, 0) + received := "" + + passed := make(chan struct{}) + go func() { + for { + select { + case recv := <-logCh: + received += string(recv) + if strings.Contains(received, "[WARN] Monitor dropped 90 logs during monitor request") { + close(passed) + } + } + } + }() + +TEST: + for { + select { + case <-passed: + break TEST + case <-time.After(1 * time.Second): + require.Fail(t, "expected to see warn dropped messages") + } + } }