Skip to content

Commit

Permalink
[aggregator] Use a single goroutine to emit metrics for all TCP clien…
Browse files Browse the repository at this point in the history
…t queues (#2991)
  • Loading branch information
vdarulis authored Dec 7, 2020
1 parent 676c172 commit 8e3a0c3
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 45 deletions.
47 changes: 13 additions & 34 deletions src/aggregator/client/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,6 @@ import (
"go.uber.org/zap"
)

const (
// By default we use 6 buckets for the queue size histogram metrics
// to achieve a good balance between metric granularity and overhead.
defaultQueueSizeNumBuckets = 6
)

var (
errInstanceQueueClosed = errors.New("instance queue is closed")
errWriterQueueFull = errors.New("writer queue is full")
Expand Down Expand Up @@ -101,6 +95,9 @@ type instanceQueue interface {
// Enqueue enqueues a data buffer.
Enqueue(buf protobuf.Buffer) error

// Size returns the number of items in the queue.
Size() int

// Close closes the queue, it blocks until the queue is drained.
Close() error
}
Expand Down Expand Up @@ -154,9 +151,11 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue {
}
q.writeFn = q.conn.Write

q.wg.Add(2)
go q.drain()
go q.reportQueueSize(iOpts.ReportInterval())
q.wg.Add(1)
go func() {
defer q.wg.Done()
q.drain()
}()

return q
}
Expand Down Expand Up @@ -199,15 +198,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error {
func (q *queue) Close() error {
q.Lock()
if q.closed {
q.Unlock()
return errInstanceQueueClosed
}
q.closed = true
close(q.doneCh)
close(q.bufCh)
q.Unlock()

close(q.doneCh)
q.wg.Wait()

close(q.bufCh)
return nil
}

Expand All @@ -230,7 +229,6 @@ func (q *queue) writeAndReset() {
}

func (q *queue) drain() {
defer q.wg.Done()
defer q.conn.Close()
timer := time.NewTimer(q.batchFlushDeadline)
lastDrain := time.Now()
Expand Down Expand Up @@ -271,24 +269,11 @@ func (q *queue) drain() {
}
}

func (q *queue) reportQueueSize(reportInterval time.Duration) {
defer q.wg.Done()

ticker := time.NewTicker(reportInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
q.metrics.queueLen.RecordValue(float64(len(q.bufCh)))
case <-q.doneCh:
return
}
}
func (q *queue) Size() int {
return len(q.bufCh)
}

type queueMetrics struct {
queueLen tally.Histogram
enqueueSuccesses tally.Counter
enqueueOldestDropped tally.Counter
enqueueCurrentDropped tally.Counter
Expand All @@ -298,15 +283,9 @@ type queueMetrics struct {
}

func newQueueMetrics(s tally.Scope, queueSize int) queueMetrics {
numBuckets := defaultQueueSizeNumBuckets
if queueSize < numBuckets {
numBuckets = queueSize
}
buckets := tally.MustMakeLinearValueBuckets(0, float64(queueSize/numBuckets), numBuckets)
enqueueScope := s.Tagged(map[string]string{"action": "enqueue"})
connWriteScope := s.Tagged(map[string]string{"action": "conn-write"})
return queueMetrics{
queueLen: s.Histogram("queue-length", buckets),
enqueueSuccesses: enqueueScope.Counter("successes"),
enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}).
Counter("dropped"),
Expand Down
16 changes: 15 additions & 1 deletion src/aggregator/client/queue_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion src/aggregator/client/tcp_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -845,7 +845,7 @@ func testTCPClientOptions() Options {
plOpts := placement.NewStagedPlacementWatcherOptions().
SetStagedPlacementStore(store).
SetStagedPlacementKey(placementKey).
SetInitWatchTimeout(time.Nanosecond)
SetInitWatchTimeout(time.Millisecond)
return NewOptions().
SetClockOptions(clock.NewOptions()).
SetConnectionOptions(testConnectionOptions()).
Expand Down
7 changes: 7 additions & 0 deletions src/aggregator/client/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ type instanceWriter interface {
// Flush flushes any buffered metrics.
Flush() error

// QueueSize returns the size of the instance queue.
QueueSize() int

// Close closes the writer.
Close() error
}
Expand Down Expand Up @@ -154,6 +157,10 @@ func (w *writer) Close() error {
return w.queue.Close()
}

func (w *writer) QueueSize() int {
return w.queue.Size()
}

func (w *writer) encodeWithLock(
encoder *lockedEncoder,
payload payloadUnion,
Expand Down
1 change: 1 addition & 0 deletions src/aggregator/client/writer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,7 @@ type testNoOpQueue struct{}

func (q testNoOpQueue) Enqueue(protobuf.Buffer) error { return nil }
func (q testNoOpQueue) Close() error { return nil }
func (q testNoOpQueue) Size() int { return 0 }

type testSerialWriter struct {
*writer
Expand Down
56 changes: 53 additions & 3 deletions src/aggregator/client/writer_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"errors"
"fmt"
"sync"
"time"

"github.com/m3db/m3/src/cluster/placement"
xerrors "github.com/m3db/m3/src/x/errors"
Expand All @@ -35,6 +36,12 @@ var (
errInstanceWriterManagerClosed = errors.New("instance writer manager closed")
)

const (
_queueMetricReportInterval = 10 * time.Second
_queueMetricBuckets = 8
_queueMetricBucketStart = 64
)

// instanceWriterManager manages instance writers.
type instanceWriterManager interface {
// AddInstances adds instances.
Expand All @@ -60,34 +67,46 @@ type instanceWriterManager interface {
type writerManagerMetrics struct {
instancesAdded tally.Counter
instancesRemoved tally.Counter
queueLen tally.Histogram
}

func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics {
buckets := append(
tally.ValueBuckets{0},
tally.MustMakeExponentialValueBuckets(_queueMetricBucketStart, 2, _queueMetricBuckets)...,
)

return writerManagerMetrics{
instancesAdded: scope.Tagged(map[string]string{
"action": "add",
}).Counter("instances"),
instancesRemoved: scope.Tagged(map[string]string{
"action": "remove",
}).Counter("instances"),
queueLen: scope.Histogram("queue-length", buckets),
}
}

type writerManager struct {
sync.RWMutex

wg sync.WaitGroup
doneCh chan struct{}
opts Options
writers map[string]*refCountedWriter
closed bool
metrics writerManagerMetrics
}

func newInstanceWriterManager(opts Options) instanceWriterManager {
return &writerManager{
wm := &writerManager{
opts: opts,
writers: make(map[string]*refCountedWriter),
metrics: newWriterManagerMetrics(opts.InstrumentOptions().MetricsScope()),
doneCh: make(chan struct{}),
}
wm.wg.Add(1)
go wm.reportMetricsLoop()
return wm
}

func (mgr *writerManager) AddInstances(instances []placement.Instance) error {
Expand Down Expand Up @@ -173,14 +192,45 @@ func (mgr *writerManager) Flush() error {

func (mgr *writerManager) Close() error {
mgr.Lock()
defer mgr.Unlock()

if mgr.closed {
mgr.Unlock()
return errInstanceWriterManagerClosed
}

mgr.closed = true
for _, writer := range mgr.writers {
writer.Close()
}

close(mgr.doneCh)
mgr.Unlock()
mgr.wg.Wait()

return nil
}

func (mgr *writerManager) reportMetricsLoop() {
defer mgr.wg.Done()

ticker := time.NewTicker(_queueMetricReportInterval)
defer ticker.Stop()

for {
select {
case <-mgr.doneCh:
return
case <-ticker.C:
mgr.reportMetrics()
}
}
}

func (mgr *writerManager) reportMetrics() {
mgr.RLock()
defer mgr.RUnlock()

for _, writer := range mgr.writers {
mgr.metrics.queueLen.RecordValue(float64(writer.QueueSize()))
}
}
20 changes: 15 additions & 5 deletions src/aggregator/client/writer_mgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,12 @@ import (
"testing"
"time"

"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/x/clock"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/require"
"go.uber.org/goleak"

"github.com/m3db/m3/src/cluster/placement"
"github.com/m3db/m3/src/x/clock"
)

var (
Expand Down Expand Up @@ -145,6 +146,7 @@ func TestWriterManagerWriteUntimedNoInstances(t *testing.T) {
mgr.closed = false
err := mgr.Write(testPlacementInstance, 0, payload)
require.Error(t, err)
require.NoError(t, mgr.Close())
}

func TestWriterManagerWriteUntimedSuccess(t *testing.T) {
Expand All @@ -162,6 +164,7 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) {
payloadRes payloadUnion
)
writer := NewMockinstanceWriter(ctrl)
writer.EXPECT().QueueSize().AnyTimes()
writer.EXPECT().
Write(gomock.Any(), gomock.Any()).
DoAndReturn(func(
Expand Down Expand Up @@ -214,6 +217,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) {
)

writer1 := NewMockinstanceWriter(ctrl)
writer1.EXPECT().QueueSize().AnyTimes()
writer1.EXPECT().
Flush().
DoAndReturn(func() error {
Expand All @@ -222,6 +226,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) {
})
errTestFlush := errors.New("test flush error")
writer2 := NewMockinstanceWriter(ctrl)
writer2.EXPECT().QueueSize().AnyTimes()
writer2.EXPECT().
Flush().
DoAndReturn(func() error {
Expand Down Expand Up @@ -249,6 +254,9 @@ func TestWriterManagerCloseAlreadyClosed(t *testing.T) {
}

func TestWriterManagerCloseSuccess(t *testing.T) {
opts := goleak.IgnoreCurrent() // TODO: other tests don't clean up properly
defer goleak.VerifyNone(t, opts)

mgr := newInstanceWriterManager(testOptions()).(*writerManager)

// Add instance list and close.
Expand All @@ -259,8 +267,10 @@ func TestWriterManagerCloseSuccess(t *testing.T) {
for _, w := range mgr.writers {
wr := w.instanceWriter.(*writer)
wr.Lock()
defer wr.Unlock()
if !wr.closed {
closed := wr.closed
wr.Unlock()

if !closed {
return false
}
}
Expand Down
16 changes: 15 additions & 1 deletion src/aggregator/client/writer_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit 8e3a0c3

Please sign in to comment.