diff --git a/src/aggregator/client/queue.go b/src/aggregator/client/queue.go index b3aadeb2b3..d1dbcb88b4 100644 --- a/src/aggregator/client/queue.go +++ b/src/aggregator/client/queue.go @@ -34,12 +34,6 @@ import ( "go.uber.org/zap" ) -const ( - // By default we use 6 buckets for the queue size histogram metrics - // to achieve a good balance between metric granularity and overhead. - defaultQueueSizeNumBuckets = 6 -) - var ( errInstanceQueueClosed = errors.New("instance queue is closed") errWriterQueueFull = errors.New("writer queue is full") @@ -101,6 +95,9 @@ type instanceQueue interface { // Enqueue enqueues a data buffer. Enqueue(buf protobuf.Buffer) error + // Size returns the number of items in the queue. + Size() int + // Close closes the queue, it blocks until the queue is drained. Close() error } @@ -154,9 +151,11 @@ func newInstanceQueue(instance placement.Instance, opts Options) instanceQueue { } q.writeFn = q.conn.Write - q.wg.Add(2) - go q.drain() - go q.reportQueueSize(iOpts.ReportInterval()) + q.wg.Add(1) + go func() { + defer q.wg.Done() + q.drain() + }() return q } @@ -199,15 +198,15 @@ func (q *queue) Enqueue(buf protobuf.Buffer) error { func (q *queue) Close() error { q.Lock() if q.closed { - q.Unlock() return errInstanceQueueClosed } q.closed = true - close(q.doneCh) - close(q.bufCh) q.Unlock() + close(q.doneCh) q.wg.Wait() + + close(q.bufCh) return nil } @@ -230,7 +229,6 @@ func (q *queue) writeAndReset() { } func (q *queue) drain() { - defer q.wg.Done() defer q.conn.Close() timer := time.NewTimer(q.batchFlushDeadline) lastDrain := time.Now() @@ -271,24 +269,11 @@ func (q *queue) drain() { } } -func (q *queue) reportQueueSize(reportInterval time.Duration) { - defer q.wg.Done() - - ticker := time.NewTicker(reportInterval) - defer ticker.Stop() - - for { - select { - case <-ticker.C: - q.metrics.queueLen.RecordValue(float64(len(q.bufCh))) - case <-q.doneCh: - return - } - } +func (q *queue) Size() int { + return len(q.bufCh) } type queueMetrics struct { - queueLen tally.Histogram enqueueSuccesses tally.Counter enqueueOldestDropped tally.Counter enqueueCurrentDropped tally.Counter @@ -298,15 +283,9 @@ type queueMetrics struct { } func newQueueMetrics(s tally.Scope, queueSize int) queueMetrics { - numBuckets := defaultQueueSizeNumBuckets - if queueSize < numBuckets { - numBuckets = queueSize - } - buckets := tally.MustMakeLinearValueBuckets(0, float64(queueSize/numBuckets), numBuckets) enqueueScope := s.Tagged(map[string]string{"action": "enqueue"}) connWriteScope := s.Tagged(map[string]string{"action": "conn-write"}) return queueMetrics{ - queueLen: s.Histogram("queue-length", buckets), enqueueSuccesses: enqueueScope.Counter("successes"), enqueueOldestDropped: enqueueScope.Tagged(map[string]string{"drop-type": "oldest"}). Counter("dropped"), diff --git a/src/aggregator/client/queue_mock.go b/src/aggregator/client/queue_mock.go index 906b90625a..b40fbeb653 100644 --- a/src/aggregator/client/queue_mock.go +++ b/src/aggregator/client/queue_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/aggregator/client/queue.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -69,6 +69,20 @@ func (mr *MockinstanceQueueMockRecorder) Enqueue(buf interface{}) *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enqueue", reflect.TypeOf((*MockinstanceQueue)(nil).Enqueue), buf) } +// Size mocks base method +func (m *MockinstanceQueue) Size() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "Size") + ret0, _ := ret[0].(int) + return ret0 +} + +// Size indicates an expected call of Size +func (mr *MockinstanceQueueMockRecorder) Size() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Size", reflect.TypeOf((*MockinstanceQueue)(nil).Size)) +} + // Close mocks base method func (m *MockinstanceQueue) Close() error { m.ctrl.T.Helper() diff --git a/src/aggregator/client/tcp_client_test.go b/src/aggregator/client/tcp_client_test.go index a36a5c6020..b1e5db411d 100644 --- a/src/aggregator/client/tcp_client_test.go +++ b/src/aggregator/client/tcp_client_test.go @@ -845,7 +845,7 @@ func testTCPClientOptions() Options { plOpts := placement.NewStagedPlacementWatcherOptions(). SetStagedPlacementStore(store). SetStagedPlacementKey(placementKey). - SetInitWatchTimeout(time.Nanosecond) + SetInitWatchTimeout(time.Millisecond) return NewOptions(). SetClockOptions(clock.NewOptions()). SetConnectionOptions(testConnectionOptions()). diff --git a/src/aggregator/client/writer.go b/src/aggregator/client/writer.go index 4d94a69319..4db58ae8ea 100644 --- a/src/aggregator/client/writer.go +++ b/src/aggregator/client/writer.go @@ -51,6 +51,9 @@ type instanceWriter interface { // Flush flushes any buffered metrics. Flush() error + // QueueSize returns the size of the instance queue. + QueueSize() int + // Close closes the writer. Close() error } @@ -154,6 +157,10 @@ func (w *writer) Close() error { return w.queue.Close() } +func (w *writer) QueueSize() int { + return w.queue.Size() +} + func (w *writer) encodeWithLock( encoder *lockedEncoder, payload payloadUnion, diff --git a/src/aggregator/client/writer_benchmark_test.go b/src/aggregator/client/writer_benchmark_test.go index 37f5194e26..0a44d192b7 100644 --- a/src/aggregator/client/writer_benchmark_test.go +++ b/src/aggregator/client/writer_benchmark_test.go @@ -150,6 +150,7 @@ type testNoOpQueue struct{} func (q testNoOpQueue) Enqueue(protobuf.Buffer) error { return nil } func (q testNoOpQueue) Close() error { return nil } +func (q testNoOpQueue) Size() int { return 0 } type testSerialWriter struct { *writer diff --git a/src/aggregator/client/writer_mgr.go b/src/aggregator/client/writer_mgr.go index 7be59aeedc..249236902a 100644 --- a/src/aggregator/client/writer_mgr.go +++ b/src/aggregator/client/writer_mgr.go @@ -24,6 +24,7 @@ import ( "errors" "fmt" "sync" + "time" "github.com/m3db/m3/src/cluster/placement" xerrors "github.com/m3db/m3/src/x/errors" @@ -35,6 +36,12 @@ var ( errInstanceWriterManagerClosed = errors.New("instance writer manager closed") ) +const ( + _queueMetricReportInterval = 10 * time.Second + _queueMetricBuckets = 8 + _queueMetricBucketStart = 64 +) + // instanceWriterManager manages instance writers. type instanceWriterManager interface { // AddInstances adds instances. @@ -60,9 +67,15 @@ type instanceWriterManager interface { type writerManagerMetrics struct { instancesAdded tally.Counter instancesRemoved tally.Counter + queueLen tally.Histogram } func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { + buckets := append( + tally.ValueBuckets{0}, + tally.MustMakeExponentialValueBuckets(_queueMetricBucketStart, 2, _queueMetricBuckets)..., + ) + return writerManagerMetrics{ instancesAdded: scope.Tagged(map[string]string{ "action": "add", @@ -70,12 +83,14 @@ func newWriterManagerMetrics(scope tally.Scope) writerManagerMetrics { instancesRemoved: scope.Tagged(map[string]string{ "action": "remove", }).Counter("instances"), + queueLen: scope.Histogram("queue-length", buckets), } } type writerManager struct { sync.RWMutex - + wg sync.WaitGroup + doneCh chan struct{} opts Options writers map[string]*refCountedWriter closed bool @@ -83,11 +98,15 @@ type writerManager struct { } func newInstanceWriterManager(opts Options) instanceWriterManager { - return &writerManager{ + wm := &writerManager{ opts: opts, writers: make(map[string]*refCountedWriter), metrics: newWriterManagerMetrics(opts.InstrumentOptions().MetricsScope()), + doneCh: make(chan struct{}), } + wm.wg.Add(1) + go wm.reportMetricsLoop() + return wm } func (mgr *writerManager) AddInstances(instances []placement.Instance) error { @@ -173,14 +192,45 @@ func (mgr *writerManager) Flush() error { func (mgr *writerManager) Close() error { mgr.Lock() - defer mgr.Unlock() if mgr.closed { + mgr.Unlock() return errInstanceWriterManagerClosed } + mgr.closed = true for _, writer := range mgr.writers { writer.Close() } + + close(mgr.doneCh) + mgr.Unlock() + mgr.wg.Wait() + return nil } + +func (mgr *writerManager) reportMetricsLoop() { + defer mgr.wg.Done() + + ticker := time.NewTicker(_queueMetricReportInterval) + defer ticker.Stop() + + for { + select { + case <-mgr.doneCh: + return + case <-ticker.C: + mgr.reportMetrics() + } + } +} + +func (mgr *writerManager) reportMetrics() { + mgr.RLock() + defer mgr.RUnlock() + + for _, writer := range mgr.writers { + mgr.metrics.queueLen.RecordValue(float64(writer.QueueSize())) + } +} diff --git a/src/aggregator/client/writer_mgr_test.go b/src/aggregator/client/writer_mgr_test.go index 8cfe5a1a88..17d962ef6d 100644 --- a/src/aggregator/client/writer_mgr_test.go +++ b/src/aggregator/client/writer_mgr_test.go @@ -26,11 +26,12 @@ import ( "testing" "time" - "github.com/m3db/m3/src/cluster/placement" - "github.com/m3db/m3/src/x/clock" - "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" + "go.uber.org/goleak" + + "github.com/m3db/m3/src/cluster/placement" + "github.com/m3db/m3/src/x/clock" ) var ( @@ -145,6 +146,7 @@ func TestWriterManagerWriteUntimedNoInstances(t *testing.T) { mgr.closed = false err := mgr.Write(testPlacementInstance, 0, payload) require.Error(t, err) + require.NoError(t, mgr.Close()) } func TestWriterManagerWriteUntimedSuccess(t *testing.T) { @@ -162,6 +164,7 @@ func TestWriterManagerWriteUntimedSuccess(t *testing.T) { payloadRes payloadUnion ) writer := NewMockinstanceWriter(ctrl) + writer.EXPECT().QueueSize().AnyTimes() writer.EXPECT(). Write(gomock.Any(), gomock.Any()). DoAndReturn(func( @@ -214,6 +217,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) { ) writer1 := NewMockinstanceWriter(ctrl) + writer1.EXPECT().QueueSize().AnyTimes() writer1.EXPECT(). Flush(). DoAndReturn(func() error { @@ -222,6 +226,7 @@ func TestWriterManagerFlushPartialError(t *testing.T) { }) errTestFlush := errors.New("test flush error") writer2 := NewMockinstanceWriter(ctrl) + writer2.EXPECT().QueueSize().AnyTimes() writer2.EXPECT(). Flush(). DoAndReturn(func() error { @@ -249,6 +254,9 @@ func TestWriterManagerCloseAlreadyClosed(t *testing.T) { } func TestWriterManagerCloseSuccess(t *testing.T) { + opts := goleak.IgnoreCurrent() // TODO: other tests don't clean up properly + defer goleak.VerifyNone(t, opts) + mgr := newInstanceWriterManager(testOptions()).(*writerManager) // Add instance list and close. @@ -259,8 +267,10 @@ func TestWriterManagerCloseSuccess(t *testing.T) { for _, w := range mgr.writers { wr := w.instanceWriter.(*writer) wr.Lock() - defer wr.Unlock() - if !wr.closed { + closed := wr.closed + wr.Unlock() + + if !closed { return false } } diff --git a/src/aggregator/client/writer_mock.go b/src/aggregator/client/writer_mock.go index 23fd8363d8..7165e24778 100644 --- a/src/aggregator/client/writer_mock.go +++ b/src/aggregator/client/writer_mock.go @@ -1,7 +1,7 @@ // Code generated by MockGen. DO NOT EDIT. // Source: github.com/m3db/m3/src/aggregator/client/writer.go -// Copyright (c) 2018 Uber Technologies, Inc. +// Copyright (c) 2020 Uber Technologies, Inc. // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to deal @@ -81,6 +81,20 @@ func (mr *MockinstanceWriterMockRecorder) Flush() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Flush", reflect.TypeOf((*MockinstanceWriter)(nil).Flush)) } +// QueueSize mocks base method +func (m *MockinstanceWriter) QueueSize() int { + m.ctrl.T.Helper() + ret := m.ctrl.Call(m, "QueueSize") + ret0, _ := ret[0].(int) + return ret0 +} + +// QueueSize indicates an expected call of QueueSize +func (mr *MockinstanceWriterMockRecorder) QueueSize() *gomock.Call { + mr.mock.ctrl.T.Helper() + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "QueueSize", reflect.TypeOf((*MockinstanceWriter)(nil).QueueSize)) +} + // Close mocks base method func (m *MockinstanceWriter) Close() error { m.ctrl.T.Helper()