Skip to content

Commit

Permalink
Addresses feedback around monitor implementation
Browse files Browse the repository at this point in the history
subselect on stopCh to prevent blocking forever.

Set up a separate goroutine to check every 3 seconds for dropped
messages.
  • Loading branch information
drewbailey committed Oct 30, 2019
1 parent 9b7f0ab commit cac0a6d
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 21 deletions.
71 changes: 56 additions & 15 deletions command/agent/monitor/monitor.go
Original file line number Diff line number Diff line change
@@ -1,27 +1,34 @@
package monitor

import (
"fmt"
"sync"
"time"

log "github.com/hashicorp/go-hclog"
)

// Monitor provides a mechanism to stream logs using go-hclog
// InterceptLogger and SinkAdapter. It allows streaming of logs
// at a different log level than what is set on the logger.
type Monitor struct {
sync.Mutex
sink log.SinkAdapter
logger log.InterceptLogger
logCh chan []byte
index int
droppedCount int
bufSize int
sink log.SinkAdapter
logger log.InterceptLogger
logCh chan []byte
droppedCount int
bufSize int
droppedDuration time.Duration
}

// New creates a new Monitor. Start must be called in order to actually start
// streaming logs
func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor {
sw := &Monitor{
logger: logger,
logCh: make(chan []byte, buf),
index: 0,
bufSize: buf,
logger: logger,
logCh: make(chan []byte, buf),
bufSize: buf,
droppedDuration: 3 * time.Second,
}

opts.Output = sw
Expand All @@ -31,15 +38,25 @@ func New(buf int, logger log.InterceptLogger, opts *log.LoggerOptions) *Monitor
return sw
}

// Start registers a sink on the monitors logger and starts sending
// received log messages over the returned channel. A non-nil
// sopCh can be used to deregister the sink and stop log streaming
func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
d.logger.RegisterSink(d.sink)

logCh := make(chan []byte, d.bufSize)
go func() {
defer close(logCh)
for {
select {
case log := <-d.logCh:
logCh <- log
select {
case <-stopCh:
d.logger.DeregisterSink(d.sink)
close(d.logCh)
return
case logCh <- log:
}
case <-stopCh:
d.Lock()
defer d.Unlock()
Expand All @@ -51,6 +68,34 @@ func (d *Monitor) Start(stopCh <-chan struct{}) <-chan []byte {
}
}()

go func() {
// loop and check for dropped messages
LOOP:
for {
select {
case <-stopCh:
break LOOP
case <-time.After(d.droppedDuration):
if d.droppedCount > 0 {
dropped := fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
select {
case d.logCh <- []byte(dropped):
default:
// Make room for dropped message
select {
case <-d.logCh:
d.droppedCount++
dropped = fmt.Sprintf("[WARN] Monitor dropped %d logs during monitor request\n", d.droppedCount)
default:
}
d.logCh <- []byte(dropped)
}
d.droppedCount = 0
}
}
}
}()

return logCh
}

Expand All @@ -67,10 +112,6 @@ func (d *Monitor) Write(p []byte) (n int, err error) {
case d.logCh <- bytes:
default:
d.droppedCount++
if d.droppedCount > 10 {
d.logger.Warn("Monitor dropped %d logs during monitor request", d.droppedCount)
d.droppedCount = 0
}
}
return
}
37 changes: 31 additions & 6 deletions command/agent/monitor/monitor_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package monitor

import (
"fmt"
"strings"
"testing"
"time"

"github.com/stretchr/testify/assert"

log "github.com/hashicorp/go-hclog"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -38,6 +38,7 @@ func TestMonitor_Start(t *testing.T) {
logger.Debug("test log")
}

// Ensure number of dropped messages are logged
func TestMonitor_DroppedMessages(t *testing.T) {
t.Parallel()

Expand All @@ -48,15 +49,39 @@ func TestMonitor_DroppedMessages(t *testing.T) {
m := New(5, logger, &log.LoggerOptions{
Level: log.Debug,
})
m.droppedDuration = 5 * time.Millisecond

doneCh := make(chan struct{})
defer close(doneCh)

m.Start(doneCh)
logCh := m.Start(doneCh)

for i := 0; i <= 9; i++ {
logger.Debug("test message")
for i := 0; i <= 100; i++ {
logger.Debug(fmt.Sprintf("test message %d", i))
}

assert.Greater(t, m.droppedCount, 0)
received := ""

passed := make(chan struct{})
go func() {
for {
select {
case recv := <-logCh:
received += string(recv)
if strings.Contains(received, "[WARN] Monitor dropped 90 logs during monitor request") {
close(passed)
}
}
}
}()

TEST:
for {
select {
case <-passed:
break TEST
case <-time.After(1 * time.Second):
require.Fail(t, "expected to see warn dropped messages")
}
}
}

0 comments on commit cac0a6d

Please sign in to comment.